/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.mapreduce.task.reduce;

import java.io.DataInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLConnection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

import javax.crypto.SecretKey;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.compress.CodecPool;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.Decompressor;
import org.apache.hadoop.io.compress.DefaultCodec;
import org.apache.hadoop.mapred.Counters;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.IFileInputStream;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapOutputIndex;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.security.SecureShuffleUtils;
import org.apache.hadoop.mapreduce.task.reduce.MapOutput.Type;
import org.apache.hadoop.mapreduce.task.reduce.MergeManager.ReservePreference;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.ReflectionUtils;

class Fetcher<K, V> extends Thread {

    private static final Log LOG = LogFactory.getLog(Fetcher.class);

    /** Number of ms before timing out a copy */
    private static final int DEFAULT_STALLED_COPY_TIMEOUT = 3 * 60 * 1000;

    /** Basic/unit connection timeout (in milliseconds) */
    private final static int UNIT_CONNECT_TIMEOUT = 60 * 1000;

    /* Default read timeout (in milliseconds) */
    private final static int DEFAULT_READ_TIMEOUT = 3 * 60 * 1000;

    private final Progressable reporter;

    private static enum ShuffleErrors {
        IO_ERROR, WRONG_LENGTH, BAD_ID, WRONG_MAP, CONNECTION, WRONG_REDUCE
    }

    private final static String SHUFFLE_ERR_GRP_NAME = "Shuffle Errors";

    private final Counters.Counter connectionErrs;

    private final Counters.Counter ioErrs;

    private final Counters.Counter wrongLengthErrs;

    private final Counters.Counter badIdErrs;

    private final Counters.Counter wrongMapErrs;

    private final Counters.Counter wrongReduceErrs;

    private final MergeManager<K, V> merger;

    private final ShuffleScheduler<K, V> scheduler;

    private final ShuffleClientMetrics metrics;

    private final ExceptionReporter exceptionReporter;

    private final int id;

    private static int nextId = 0;

    private final int reduce;

    private final int connectionTimeout;

    private final int readTimeout;

    // Decompression of map-outputs
    private final CompressionCodec codec;

    private final Decompressor decompressor;

    private final SecretKey jobTokenSecret;
    
    
    // SKEWREDUCE
    private final String jobOutPath;
    private final FileSystem jobFs;

    public Fetcher(JobConf job, TaskAttemptID reduceId,
            ShuffleScheduler<K, V> scheduler, MergeManager<K, V> merger,
            Reporter reporter, ShuffleClientMetrics metrics,
            ExceptionReporter exceptionReporter, SecretKey jobTokenSecret) throws IOException {
        this.reporter = reporter;
        this.scheduler = scheduler;
        this.merger = merger;
        this.metrics = metrics;
        this.exceptionReporter = exceptionReporter;
        this.id = ++nextId;
        this.reduce = reduceId.getTaskID().getId();
        this.jobTokenSecret = jobTokenSecret;
        ioErrs = reporter.getCounter(SHUFFLE_ERR_GRP_NAME,
                ShuffleErrors.IO_ERROR.toString());
        wrongLengthErrs = reporter.getCounter(SHUFFLE_ERR_GRP_NAME,
                ShuffleErrors.WRONG_LENGTH.toString());
        badIdErrs = reporter.getCounter(SHUFFLE_ERR_GRP_NAME,
                ShuffleErrors.BAD_ID.toString());
        wrongMapErrs = reporter.getCounter(SHUFFLE_ERR_GRP_NAME,
                ShuffleErrors.WRONG_MAP.toString());
        connectionErrs = reporter.getCounter(SHUFFLE_ERR_GRP_NAME,
                ShuffleErrors.CONNECTION.toString());
        wrongReduceErrs = reporter.getCounter(SHUFFLE_ERR_GRP_NAME,
                ShuffleErrors.WRONG_REDUCE.toString());

        if (job.getCompressMapOutput()) {
            Class<? extends CompressionCodec> codecClass = job
                    .getMapOutputCompressorClass(DefaultCodec.class);
            codec = ReflectionUtils.newInstance(codecClass, job);
            decompressor = CodecPool.getDecompressor(codec);
        } else {
            codec = null;
            decompressor = null;
        }

        this.connectionTimeout = job.getInt(
                MRJobConfig.SHUFFLE_CONNECT_TIMEOUT,
                DEFAULT_STALLED_COPY_TIMEOUT);
        this.readTimeout = job.getInt(MRJobConfig.SHUFFLE_READ_TIMEOUT,
                DEFAULT_READ_TIMEOUT);

        Path p = FileOutputFormat.getOutputPath(job);
        jobOutPath = p == null ? null : p.toString();
        jobFs = p.getFileSystem(job);
        
        setName("fetcher#" + id);
        setDaemon(true);
    }

    public void run() {
        try {
            while (true) {
                MapHost host = null;
                FetchTask<K,V> task = null;
                try {
                    // If merge is on, block
                    merger.waitForInMemoryMerge();

                    // Get a host to shuffle from
                    ShuffleScheduler<K,V>.NextFetch nextFetch = scheduler.getNext();
                    if ( nextFetch.hasMapHost() ) {
                        host = nextFetch.getMapHost();
                        metrics.threadBusy();
    
                        // Shuffle
                        copyFromHost(host);
                    } else {
                        task = nextFetch.getFetchTask();
                        if ( task.isAborted() ) {
                            // already canceled.
                            continue;
                        }
                        
                        metrics.threadBusy();
                        
                        // Shuffle
                        copyFromHdfs(task);
                    }
                } finally {
                    if (host != null) {
                        scheduler.freeHost(host);
                    }
                    if ( host != null || task != null ) {
                        metrics.threadFree();
                    }
                }
            }
        } catch (InterruptedException ie) {
            return;
        } catch (Throwable t) {
            exceptionReporter.reportException(t);
        }
    }

    /**
     * The crux of the matter...
     * 
     * @param host
     *            {@link MapHost} from which we need to shuffle available
     *            map-outputs.
     */
    private void copyFromHost(MapHost host) throws IOException {
        // Get completed maps on 'host'
        List<TaskAttemptID> maps = scheduler.getMapsForHost(host);

        // Sanity check to catch hosts with only 'OBSOLETE' maps,
        // especially at the tail of large jobs
        if (maps.size() == 0) {
            return;
        }

        LOG.debug("Fetcher " + id + " going to fetch from " + host);
        for (TaskAttemptID tmp : maps) {
            LOG.debug(tmp);
        }
        
        // FIXME exclude any completed map outputs

        // List of maps to be fetched yet
        Set<TaskAttemptID> remaining = new HashSet<TaskAttemptID>(maps);

        // Construct the url and connect
        DataInputStream input;
        boolean connectSucceeded = false;

        try {
            URL url = getMapOutputURL(host, maps);
            URLConnection connection = url.openConnection();

            // generate hash of the url
            String msgToEncode = SecureShuffleUtils.buildMsgFrom(url);
            String encHash = SecureShuffleUtils.hashFromString(msgToEncode,
                    jobTokenSecret);

            // put url hash into http header
            connection.addRequestProperty(
                    SecureShuffleUtils.HTTP_HEADER_URL_HASH, encHash);
            // set the read timeout
            connection.setReadTimeout(readTimeout);
            connect(connection, connectionTimeout);
            connectSucceeded = true;
            input = new DataInputStream(connection.getInputStream());

            // get the replyHash which is HMac of the encHash we sent to the
            // server
            String replyHash = connection
                    .getHeaderField(SecureShuffleUtils.HTTP_HEADER_REPLY_URL_HASH);
            if (replyHash == null) {
                throw new IOException(
                        "security validation of TT Map output failed");
            }
            LOG.debug("url=" + msgToEncode + ";encHash=" + encHash
                    + ";replyHash=" + replyHash);
            // verify that replyHash is HMac of encHash
            SecureShuffleUtils.verifyReply(replyHash, encHash, jobTokenSecret);
            LOG.info("for url=" + msgToEncode + " sent hash and receievd reply");
        } catch (IOException ie) {
            ioErrs.increment(1);
            LOG.warn(
                    "Failed to connect to " + host + " with "
                            + remaining.size() + " map outputs", ie);

            // If connect did not succeed, just mark all the maps as failed,
            // indirectly penalizing the host
            if (!connectSucceeded) {
                for (TaskAttemptID left : remaining) {
                    scheduler.copyFailed(left, host, connectSucceeded);
                }
            } else {
                // If we got a read error at this stage, it implies there was a
                // problem
                // with the first map, typically lost map. So, penalize only
                // that map
                // and add the rest
                TaskAttemptID firstMap = maps.get(0);
                scheduler.copyFailed(firstMap, host, connectSucceeded);
            }

            // Add back all the remaining maps, WITHOUT marking them as failed
            for (TaskAttemptID left : remaining) {
                scheduler.putBackKnownMapOutput(host, left);
            }

            return;
        }

        try {
            // Loop through available map-outputs and fetch them
            // On any error, good becomes false and we exit after putting back
            // the remaining maps to the yet_to_be_fetched list
            boolean good = true;
            while (!remaining.isEmpty() && good) {
                good = copyMapOutput(host, input, remaining);
            }

            IOUtils.cleanup(LOG, input);

            // Sanity check
            if (good && !remaining.isEmpty()) {
                throw new IOException(
                        "server didn't return all expected map outputs: "
                                + remaining.size() + " left.");
            }
        } finally {
            for (TaskAttemptID left : remaining) {
                scheduler.putBackKnownMapOutput(host, left);
            }
        }

    }

    private boolean copyMapOutput(MapHost host, DataInputStream input,
            Set<TaskAttemptID> remaining) {
        MapOutput<K, V> mapOutput = null;
        TaskAttemptID mapId = null;
        long decompressedLength = -1;
        long compressedLength = -1;

        try {
            long startTime = System.currentTimeMillis();
            int forReduce = -1;
            // Read the shuffle header
            try {
                ShuffleHeader header = new ShuffleHeader();
                header.readFields(input);
                mapId = TaskAttemptID.forName(header.mapId);
                compressedLength = header.compressedLength;
                decompressedLength = header.uncompressedLength;
                forReduce = header.forReduce;
            } catch (IllegalArgumentException e) {
                badIdErrs.increment(1);
                LOG.warn("Invalid map id ", e);
                return false;
            }

            // Do some basic sanity verification
            if (!verifySanity(compressedLength, decompressedLength, forReduce,
                    remaining, mapId)) {
                return false;
            }

            LOG.debug("header: " + mapId + ", len: " + compressedLength + ", decomp len: " + decompressedLength);

            // Get the location for the map output - either in-memory or on-disk
            mapOutput = merger.reserve(mapId, decompressedLength, id);

            // Check if we can shuffle *now* ...
            if (mapOutput.getType() == Type.WAIT) {
                LOG.info("fetcher#" + id + " - MergerManager returned Status.WAIT ...");
                return false;
            }

            // Go!
            LOG.info("fetcher#" + id + " about to shuffle output of map "
                    + mapOutput.getMapId() + " decomp: " + decompressedLength
                    + " len: " + compressedLength + " to "
                    + mapOutput.getType());
            if (mapOutput.getType() == Type.MEMORY) {
                shuffleToMemory(host, mapOutput, input,
                        (int) decompressedLength, (int) compressedLength);
            } else {
                shuffleToDisk(host, mapOutput, input, compressedLength);
            }

            // Inform the shuffle scheduler
            long endTime = System.currentTimeMillis();
            scheduler.copySucceeded(mapId, host, compressedLength, endTime - startTime, mapOutput);
            // Note successful shuffle
            remaining.remove(mapId);
            metrics.successFetch();
            return true;
        } catch (IOException ioe) {
            ioErrs.increment(1);
            if (mapId == null || mapOutput == null) {
                LOG.info("fetcher#" + id + " failed to read map header" + mapId
                        + " decomp: " + decompressedLength + ", "
                        + compressedLength, ioe);
                return false;
            }

            LOG.info("Failed to shuffle output of " + mapId + " from "
                        + host.getHostName(), ioe);

            // Inform the shuffle-scheduler
            mapOutput.abort();
            scheduler.copyFailed(mapId, host, true);
            metrics.failedFetch();
            return false;
        }

    }

    /**
     * Do some basic verification on the input received -- Being defensive
     * 
     * @param compressedLength
     * @param decompressedLength
     * @param forReduce
     * @param remaining
     * @param mapId
     * @return true/false, based on if the verification succeeded or not
     */
    private boolean verifySanity(long compressedLength,
            long decompressedLength, int forReduce,
            Set<TaskAttemptID> remaining, TaskAttemptID mapId) {
        if (compressedLength < 0 || decompressedLength < 0) {
            wrongLengthErrs.increment(1);
            LOG.warn(getName() + " invalid lengths in map output header: id: "
                    + mapId + " len: " + compressedLength + ", decomp len: "
                    + decompressedLength);
            return false;
        }

        if (forReduce != reduce) {
            wrongReduceErrs.increment(1);
            LOG.warn(getName() + " data for the wrong reduce map: " + mapId
                    + " len: " + compressedLength + " decomp len: "
                    + decompressedLength + " for reduce " + forReduce);
            return false;
        }

        // Sanity check
        if (!remaining.contains(mapId)) {
            wrongMapErrs.increment(1);
            LOG.warn("Invalid map-output! Received output for " + mapId);
            return false;
        }

        return true;
    }

    /**
     * Create the map-output-url. This will contain all the map ids separated by
     * commas
     * 
     * @param host
     * @param maps
     * @return
     * @throws MalformedURLException
     */
    private URL getMapOutputURL(MapHost host, List<TaskAttemptID> maps)
            throws MalformedURLException {
        // Get the base url
        StringBuffer url = new StringBuffer(host.getBaseUrl());

        boolean first = true;
        for (TaskAttemptID mapId : maps) {
            if (!first) {
                url.append(",");
            }
            url.append(mapId);
            first = false;
        }

        LOG.debug("MapOutput URL for " + host + " -> " + url.toString());
        return new URL(url.toString());
    }

    /**
     * The connection establishment is attempted multiple times and is given up
     * only on the last failure. Instead of connecting with a timeout of X, we
     * try connecting with a timeout of x < X but multiple times.
     */
    private void connect(URLConnection connection, int connectionTimeout)
            throws IOException {
        int unit = 0;
        if (connectionTimeout < 0) {
            throw new IOException("Invalid timeout " + "[timeout = "
                    + connectionTimeout + " ms]");
        } else if (connectionTimeout > 0) {
            unit = Math.min(UNIT_CONNECT_TIMEOUT, connectionTimeout);
        }
        // set the connect timeout to the unit-connect-timeout
        connection.setConnectTimeout(unit);
        while (true) {
            try {
                connection.connect();
                break;
            } catch (IOException ioe) {
                // update the total remaining connect-timeout
                connectionTimeout -= unit;

                // throw an exception if we have waited for timeout amount of
                // time
                // note that the updated value if timeout is used here
                if (connectionTimeout == 0) {
                    throw ioe;
                }

                // reset the connect timeout for the last try
                if (connectionTimeout < unit) {
                    unit = connectionTimeout;
                    // reset the connect time out for the final connect
                    connection.setConnectTimeout(unit);
                }
            }
        }
    }

    private void shuffleToMemory(MapHost host, MapOutput<K, V> mapOutput,
            InputStream input, int decompressedLength, int compressedLength)
            throws IOException {
        IFileInputStream checksumIn = new IFileInputStream(input,
                compressedLength);

        input = checksumIn;

        // Are map-outputs compressed?
        if (codec != null) {
            decompressor.reset();
            input = codec.createInputStream(input, decompressor);
        }

        // Copy map-output into an in-memory buffer
        byte[] shuffleData = mapOutput.getMemory();

        try {
            IOUtils.readFully(input, shuffleData, 0, shuffleData.length);
            metrics.inputBytes(shuffleData.length);
            reporter.progress();
            LOG.info("Read " + shuffleData.length
                    + " bytes from map-output for " + mapOutput.getMapId());
        } catch (IOException ioe) {
            // Close the streams
            IOUtils.cleanup(LOG, input);

            // Re-throw
            throw ioe;
        }

    }

    private void shuffleToDisk(MapHost host, MapOutput<K, V> mapOutput,
            InputStream input, long compressedLength) throws IOException {
        // Copy data to local-disk
        OutputStream output = mapOutput.getDisk();
        long bytesLeft = compressedLength;
        try {
            final int BYTES_TO_READ = 64 * 1024;
            byte[] buf = new byte[BYTES_TO_READ];
            while (bytesLeft > 0) {
                int n = input.read(buf, 0,
                        (int) Math.min(bytesLeft, BYTES_TO_READ));
                if (n < 0) {
                    throw new IOException("read past end of stream reading "
                            + mapOutput.getMapId());
                }
                output.write(buf, 0, n);
                bytesLeft -= n;
                metrics.inputBytes(n);
                reporter.progress();
            }

            LOG.info("Read " + (compressedLength - bytesLeft)
                    + " bytes from map-output for " + mapOutput.getMapId());

            output.close();
        } catch (IOException ioe) {
            // Close the streams
            IOUtils.cleanup(LOG, input, output);

            // Re-throw
            throw ioe;
        }

        // Sanity check
        if (bytesLeft != 0) {
            throw new IOException("Incomplete map output received for "
                    + mapOutput.getMapId() + " from " + host.getHostName()
                    + " (" + bytesLeft + " bytes missing of "
                    + compressedLength + ")");
        }
    }
    
    /**
     * The crux of the matter...
     * 
     * @param host
     *            {@link MapHost} from which we need to shuffle available
     *            map-outputs.
     */
    private void copyFromHdfs(FetchTask<K,V> task) throws IOException {
        TaskAttemptID mapId = task.getTaskAttemptID();
        if ( LOG.isDebugEnabled() ) {
            LOG.debug("Fetcher " + id + " going to fetch " + mapId + " from HDFS");
        }
        
        long startTime = System.currentTimeMillis();

        MapOutputIndex.Record index = task.getIndex();
        long offset = index.getOffset();
        long compressedLength = index.getPartLength();
        long decompressedLength = index.getRawLength();
        Path inputFile = task.getInputPath(jobOutPath);
        if ( LOG.isDebugEnabled() ) {
            LOG.debug("Fetcher " + id + ": path = "+inputFile+"; bytes = "+index.getPartLength());
        }

        FSDataInputStream input = null;
        MapOutput<K,V> mapOutput = null;
        final ReactiveMapOutput<K,V> rMapOutput = task.getReactiveMapOutput();
        try {
            // Get the location for the map output - either in-memory or on-disk
            // ahem, happy to go disk. :-)
            
            // if the reactive output can fit in memory and we are sure that some memory will become
            // available, i.e., merge is in progress, wait. otherwise, go disk.

            // if the reactive output is too large and never fit in memory, don't bother to go disk.
            
            mapOutput = merger.reserve(mapId, decompressedLength, id);
            // Check if we can shuffle *now* ...
            if (mapOutput.getType() == Type.WAIT) {
                LOG.info("fetcher#" + id + " - MergerManager returned Status.WAIT ...");
                try {
                    merger.waitForInMemoryMerge();
                } catch ( InterruptedException ignore ) {}
                mapOutput = merger.reserve(mapId, decompressedLength, id);
                if ( mapOutput.getType() == Type.WAIT ) {
                    // we are blocked by the reactive map output. here, decide to go for reactive output
                    LOG.info("fetcher#" + id + " - some output is blocking the way. just commit it!!");
                    scheduler.commitPendingReactiveOutput(); // this should make room for next ones.
                    scheduler.putBackKnownReactiveMapOutput(task);
                    return;
                }
            }

            input = jobFs.open(inputFile);
            input.seek(offset);
            
            task.setMapOutput(mapOutput);
            
            // Go!
            if ( mapOutput.getType() == Type.MEMORY ) {
                LOG.info("fetcher#" + id + " about to shuffle output in HDFS of map "
                        + mapOutput.getMapId() + " decomp: " + decompressedLength
                        + " len: " + compressedLength + " to "
                        + mapOutput.getType());

                shuffleFromHdfsToMemory(null, mapOutput, input, (int)decompressedLength, (int)compressedLength);
            } else {
                LOG.info("fetcher#" + id + " about to shuffle output in HDFS of map "
                        + mapOutput.getMapId() + " decomp: " + decompressedLength
                        + " len: " + compressedLength + " to DISK "
                        + mapOutput.getOutputPath());

                shuffleFromHdfsToDisk(null, mapOutput, input, compressedLength);
            }
            

            // Inform the shuffle scheduler
            long endTime = System.currentTimeMillis();
            
            scheduler.copySucceeded(mapId, task, endTime - startTime);
            metrics.successFetch();
        } catch (IOException ioe) {
            ioErrs.increment(1);
            
            IOUtils.cleanup(LOG, input);

            LOG.info("Failed to shuffle output of " + mapId + " from " + inputFile, ioe);

            // Inform the shuffle-scheduler
            scheduler.copyFailed(mapId, task, true);
            metrics.failedFetch();
        } finally {
            IOUtils.cleanup(LOG, input);
        }
    }
    
    private void shuffleFromHdfsToMemory(Path file, MapOutput<K, V> mapOutput,
            InputStream input, int decompressedLength, int compressedLength)
            throws IOException {
        IFileInputStream checksumIn = new IFileInputStream(input,
                compressedLength);

        input = checksumIn;

        // Are map-outputs compressed?
        if (codec != null) {
            decompressor.reset();
            input = codec.createInputStream(input, decompressor);
        }

        // Copy map-output into an in-memory buffer
        byte[] shuffleData = mapOutput.getMemory();

        try {
            IOUtils.readFully(input, shuffleData, 0, shuffleData.length);
            metrics.inputBytes(shuffleData.length);
            reporter.progress();
            LOG.info("Read " + shuffleData.length
                    + " bytes from map-output in HDFS for " + mapOutput.getMapId());
        } catch (IOException ioe) {
            // Close the streams
            IOUtils.cleanup(LOG, input);

            // Re-throw
            throw ioe;
        }

    }

    private void shuffleFromHdfsToDisk(Path file, MapOutput<K, V> mapOutput,
            InputStream input, long compressedLength) throws IOException {
        // Copy data to local-disk
        OutputStream output = mapOutput.getDisk();
        long bytesLeft = compressedLength;
        try {
            final int BYTES_TO_READ = 64 * 1024;
            byte[] buf = new byte[BYTES_TO_READ];
            while (bytesLeft > 0) {
                int n = input.read(buf, 0,
                        (int) Math.min(bytesLeft, BYTES_TO_READ));
                if (n < 0) {
                    throw new IOException("read past end of stream reading "
                            + mapOutput.getMapId());
                }
                output.write(buf, 0, n);
                bytesLeft -= n;
                metrics.inputBytes(n);
                reporter.progress();
            }

            LOG.info("Read " + (compressedLength - bytesLeft)
                    + " bytes from map-output in HDFS for " + mapOutput.getMapId());

            output.close();
        } catch (IOException ioe) {
            // Close the streams
            IOUtils.cleanup(LOG, input, output);

            // Re-throw
            throw ioe;
        }

        // Sanity check
        if (bytesLeft != 0) {
            throw new IOException("Incomplete map output received for "
                    + mapOutput.getMapId() + " from " + file
                    + " (" + bytesLeft + " bytes missing of "
                    + compressedLength + ")");
        }
    }
}
